Out of Memory(OOM)

Spark applications are easy to write and easy to understand when everything goes according to plan. However, it becomes very difficult when Spark applications start to slow down or fail. Sometimes a well-tuned application might fail due to a data change, or a data layout change. Sometimes an application which was running well starts behaving badly due to resource starvation.Spark’s architecture is memory-centric. Some of the most common causes of OOM are: 

  • Incorrect usage of Spark  
  • High concurrency 
  • Inefficient queries 
  • Incorrect configuration

To avoid these problems, we need to have a basic understanding of Spark and our data. There are certain things that can be done that will either prevent OOM or rectify an application which failed due to OOM. Spark’s default configuration may or may not be sufficient or accurate for your applications. Sometimes even a well-tuned application may fail due to OOM as the underlying data has changed. Out of memory issues can be observed for the driver node, executor nodes, and sometimes even for the node manager. Let’s take a look at each case.

Out of Memory Driver Level

A driver in Spark is the JVM where the application’s main control flow runs. More often than not, the driver fails with an OutOfMemory error due to the incorrect usage of Spark. Spark is an engine to distribute the workload among worker machines.The driver should only be considered as an orchestrator. In typical deployments, a driver is provisioned less memory than executors. Hence, we should be careful about what we are doing on the driver.

Common causes which result in driver OOM are:

  • rdd.collect()
  • sparkContext.broadcast
  • Low driver memory configured as per the application requirements.
  • Misconfiguration of spark.sql.autoBroadcastJoinThreshold. Spark uses this limit to broadcast a relation to all the nodes in case of a join operation. At the very first usage, the whole relation is materialized at the driver node. Sometimes multiple tables are also broadcast as part of the query execution.

Try to write your application in such a way that you can avoid explicit result collection at the driver level. You can very well delegate this task to one of the executors. For example, if you want to save the results to a particular file, either you can collect it at the driver or assign an executor to do that for you. 

// Inefficient code
val result = dataFrame.collect() // Will cause driver to collect the results
saveToFile(result)

// Better code
dataFrame.repartition(1).write.csv("/file/path") // Will assign an executor to collect the result. Assuming executors are better provisioned.

If you are using Spark’s SQL and the driver is OOM due to broadcasting relations, then either you can increase the driver memory (if possible) or reduce the spark.sql.autoBroadcastJoinThreshold  value so that your join operations will use the more memory-friendly sort merge join.

Out of Memory Executor Level

This is a very common issue with Spark applications which may be due to various reasons. Some of the most common reasons are high concurrency, inefficient queries, and incorrect configuration. Let’s look at each in turn.

Before understanding why high concurrency might be a cause of OOM, let’s try to understand how Spark executes a query or job and what the components are that contribute to memory consumption.

Spark jobs or queries are broken down into multiple stages, and each stage is further divided into tasks.The number of tasks depends on various factors like which stage is getting executed, which data source is being read, etc. If it’s a map stage (scan phase in SQL), typically the underlying data source partitions are honored.

For example, if a Hive ORC table has 2000 partitions, then 2000 tasks get created for the map stage for reading the table, assuming partition pruning did not come into play. If it’s a reduce stage (shuffle stage), then Spark will use either the spark.default.parallelism setting for RDDs or spark.sql.shuffle.partitions for data sets for determining the number of tasks. How many tasks are executed in parallel on each executor will depend on the spark.executor.cores property. If this value is set to a higher value without due consideration of the memory required, executors may fail with OOM. Now let’s see what happens under the hood while a task is getting executed and some probable causes of OOM.

Let’s say we are executing a map task or in the scanning phase of SQL from an HDFS file or a Parquet/ORC table. For HDFS files, each Spark task will read a 128 MB block of data. So if 10 parallel tasks are running, then the memory requirement is at least 128 *10 — and that's only for storing the partitioned data. This is, again, ignoring any data compression which might cause data to blow up significantly depending on the compression algorithms.

Spark reads Parquet in a vectorized format. To put it simply, with each task, Spark reads data from the Parquet file, batch by batch. As Parquet is columnar, these batches are constructed for each of the columns. It accumulates a certain amount of column data in memory before executing any operation on that column. This means Spark needs some data structures and bookkeeping to store that much data. Also, encoding techniques like dictionary encoding have some state saved in memory. All of them require memory. 

Apache Spark architecture

So, with more concurrency the overhead increases. Also, if there is a broadcast join involved, then the broadcast variables will also take some memory. The above diagram shows a simple case where each executor is executing two tasks in parallel.

Inefficient queries
While Spark’s Catalyst engine tries to optimize a query as much as possible, it can’t help if the query itself is badly written. For example, selecting all the columns of a Parquet/ORC table. As seen in the previous section, each column needs some in-memory column batch state. If more columns are selected, then the overhead will be higher. Try to read as few columns as possible. Try to use filters wherever possible, so that less data is fetched to the executors. Some of the data sources support partition pruning. If your query can be converted to use partition column(s), then it will reduce data movement to a large extent.

Incorrect Configuration
Incorrect configuration of memory and caching can also cause failures and slowdowns in Spark applications. Let’s look at some examples.

Executor and Driver Memory
Each application’s memory requirement is different. Depending on the requirement, each app has to be configured differently. You should ensure the values in spark.executor.memory or spark.driver.memory are correct, depending on the workload. As obvious as it may seem, this is one of the hardest things to get right. We need the help of tools to monitor the actual memory usage of the application.

Memory Overhead
Sometimes it's not the executor memory, rather its the YARN container memory overhead that causes OOM or the node gets killed by YARN. “YARN kill” messages typically look like this:

[pid=<pid>,containerID=[contained_ID] isrunningbeyondphysicalmemorylimits. Currentusage: 1.5 GBof 1.5 GBphysicalmemoryused; 4.6 GBof 3.1 GBvirtualmemoryused. Killingcontainer

YARN runs each Spark component like executors and drivers inside containers. Overhead memory is the off-heap memory used for JVM overheads, interned strings, and other metadata in the JVM. In this case, you need to configure spark.yarn.executor.memoryOverhead to a proper value. Typically, 10 percent of total executor memory should be allocated for overhead.

Caching Memory
If your application uses Spark caching to store some datasets, then it’s worthwhile to consider Spark’s memory manager settings. Spark’s memory manager is written in a very generic fashion to cater to all workloads. Hence, there are several knobs to set it correctly for a particular workload.

Spark has defined memory requirements as two types: 

  • Execution 
  • Storage.
Storage memory is used for caching purposes and execution memory is acquired for temporary structures like hash tables for aggregation, joins, etc. Both execution and storage memory can be obtained from a configurable fraction of total heap memory. That setting is spark.memory.fraction. The default is 60 percent. Out of which, by default, 50 percent is assigned (configurable by spark.memory.storageFraction) to storage and the rest is assigned for execution.

There are situations where each of the above pools of memory, namely execution and storage, may borrow from each other if the other pool is free. Also, storage memory can be evicted to a limit if it has borrowed memory from execution. However, without going into those complexities, we can configure our program such that our cached data which fits in storage memory should not cause a problem for execution. If we don’t want all our cached data to sit in memory, then we can configure  spark.memory.storageFraction to a lower value so that extra data would get evicted and execution would not face memory pressure.

Out of Memory at NodeManager
Spark applications which do data shuffling as part of 'group by' or 'join' like operations, incur significant overhead. Normally, data shuffling processes are done via the executor process. If the executor is busy or under heavy GC load, then it can’t cater to the shuffle requests. This problem is alleviated to some extent by using an external shuffle service.

External shuffle services run on each worker node and handle shuffle requests from executors. Executors can read shuffle files from this service rather than reading from each other. This helps the requesting executors to read shuffle files even if the producing executors are killed or slow. Also, when dynamic allocation is enabled, it's mandatory to enable an external shuffle service. When Spark's external shuffle service is configured with YARN, NodeManager starts an auxiliary service which acts as an external shuffle service provider. By default, NodeManager memory is around 1 GB. However, applications which do heavy data shuffling might fail due to NodeManager running out of memory. It's imperative to properly configure your NodeManager if your applications fall into the above category.

No comments:

Post a Comment